package com.atguigu.flinkSql;

import com.atguigu.been.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @author wky
 * @create 2021-07-21-9:21
 */

//处理时间作为
public class Flink11_Sql_ProcessTime {
    public static void main(String[] args) {
        //创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //TODO 创建表执行环境
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
        //TODO 在sql语句中指定 处理时间 不是我们东八区的时间 按0时区(格林威治时间)算 我们的时间减8小时 2021-07-21T11:03:10.812
        //注意不要多, 号 也不要多 '
        tableEnvironment.executeSql("create table sensor (id string, ts bigint, vc int, " +
                "pt_time as PROCTIME()) with(" +
                "'connector' = 'filesystem'," +
                "'path' = 'src/input/sensor_sql.txt'," +
                "'format'='csv'" +
                ")");
        tableEnvironment.executeSql("select * from sensor").print();


    }
}
